;; CHANNELS

;; channel-receive: The channel, which is used by the work-distributor to
;; receive messages from the outer context. This channel is created in the
;; pool-initializer and given to all workers, so that they can report themselves
;; as ready to receive more work.

;; channel-return: This channel is created by the publish procedure and given to
;; work-distributor, which gives it to the workers, so that they can send
;; results of completed work as messages on this channel to the outer context.

(define-module (fibers-pool))


(use-modules
 ;; FIFO queue, not functional, using mutation
 ;; https://www.gnu.org/software/guile/manual/html_node/Queues.html
 (ice-9 q)
 (ice-9 match)
 (ice-9 threads)
 (rnrs exceptions)
 (rnrs conditions)
 ;; fibers internals are needed for creating schedulers without running anything
 ;; in them immediately
 (fibers)
 (fibers channels)
 (fibers internal))


(define displayln
  (lambda (msg)
    (display msg)
    (newline)))


(define work-distributor
  (lambda (channel-receive)
    ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor started")
    ;; (displayln "[WORK-DISTRIBUTOR]: starting work-distributor message loop")
    (let loop ([work-queue (make-q)]
               [worker-channels-queue (make-q)])
      (displayln "[WORK-DISTRIBUTOR]: work-distributor is listening for messages")

      (display "[WORK-DISTRIBUTOR]: number of ready workers in queue: ")
      (displayln (q-length worker-channels-queue))

      (display "[WORK-DISTRIBUTOR]: number of works in queue: ")
      (displayln (q-length work-queue))

      (match (pk 'work-distributor-received-msg (get-message channel-receive))
        [('worker-ready . channel-worker)
         (displayln "[WORK-DISTRIBUTOR]: work-distributor received ready worker channel")
          ;; If there is no work for the ready worker, enqueue the worker,
         ;; otherwise give it work.
         (cond
          [(q-empty? work-queue)
           ;; (displayln "[WORK-DISTRIBUTOR]: work queue is empty")
           (enq! worker-channels-queue channel-worker)]
          [else
           ;; (displayln "[WORK-DISTRIBUTOR]: work queue has work")
           (let ([some-work (deq! work-queue)])
             ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor will put work on channel")
             (put-message channel-worker (cons 'work some-work))
             ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor did put work on channel")
             )])
         (loop work-queue worker-channels-queue)]
        [('work . work)
         ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor received work")
         ;; ~work~ is always a pair of a thunk to be run and a return channel,
         ;; on which the result shall be put.

         ;; If there is no worker ready, enqueue the work, otherwise distribute
         ;; the work to a ready worker.
         (cond
          [(q-empty? worker-channels-queue)
           ;; (displayln "[WORK-DISTRIBUTOR]: worker queue is empty")
           (enq! work-queue work)]
          [else
           ;; (displayln "[WORK-DISTRIBUTOR]: ready workers available")
           (let ([channel-worker (deq! worker-channels-queue)])
             ;; (displayln "[WORK-DISTRIBUTOR]: will put work on channel")
             (put-message channel-worker (cons 'work work))
             ;; (displayln "[WORK-DISTRIBUTOR]: did put work on channel")
             )])
         (loop work-queue worker-channels-queue)]
        ;; On any other message raise a condition.
        [other
         (raise
          (condition
           (make-error)
           (make-message-condition "work-distributor received unrecognized message")
           (make-irritants-condition (list other))))]))))


(define worker
  (lambda (worker-index channel-receive)
    (let ([channel-worker (make-channel)])
      (displayln "[WORKER]: before worker message loop")
      (let loop ()
        ;; Report as ready. Give my own channel to the work-distributor to let
        ;; it send me work.
        (put-message channel-receive
                     (cons 'worker-ready
                           channel-worker))
        ;; Get messages sent to me by the distributor on my own channel.
        (match (pk 'worker-got-msg (get-message channel-worker))
          ;; If I receive work, do the work and return it on the channel-return.
          [('work . (thunk . channel-return))
           ;; Put the result on the return channel, so that anyone, who has the
           ;; a binding of the return channel, can access the result.
           (put-message channel-return (thunk))
           (loop)]
          ;; On any other message raise a condition.
          [other
           (raise
            (condition
             (make-error)
             (make-message-condition "worker received unrecognized message")
             (make-irritants-condition (list other))))])))))


(define pool-initializer
  (lambda* (#:key (parallelism (current-processor-count)))

    ;; (define run-fibers-in-scheduler

    ;; (displayln "[POOL INIT]: runnning pool-initializer")
    (let ([channel-receive (make-channel)]
          [scheduler (make-scheduler #:parallelism parallelism)])
      ;; start as many workers as are desired

      ;; TODO: PROBLEM: ~run-fibers~ blocks. So we need a new thread to run the
      ;; fibers in a non-blocking way. LOOKUP: How to start fibers without
      ;; waiting for them to finish?
      ;; (displayln "[POOL INIT]: will run-fibers with new thread")

      (call-with-new-thread
       (lambda ()
         ;; (displayln "[POOL INIT THREAD]: running")
         (run-fibers
          (lambda ()
            ;; (displayln "[POOL INIT THREAD]: will start some fibers")
            ;; (display "[POOL INIT THREAD]: parallelism is: ") (displayln parallelism)
            (let loop ([index parallelism])
              (unless (zero? index)
                ;; using fibers:
                ;; TODO: use created scheduler
                ;; (displayln "[POOL INIT THREAD]: there are more fibers to spawn")
                (display "[POOL INIT THREAD]: will spawn fiber ") (displayln index)
                (spawn-fiber (lambda () (worker index channel-receive)))
                ;; We do not need to spawn new fibers in the same scheduler later. The
                ;; fibers should stay alive for the whole duration the program is
                ;; running.
                (displayln "[POOL INIT THREAD]: fiber spawned")
                (loop (- index 1)))))
          #:scheduler scheduler)
         (displayln "[POOL INIT]: pool init thread returning")
         ))
      (displayln "[POOL INIT]: will start work-distributor")
      (call-with-new-thread
       (lambda ()
         (work-distributor channel-receive)))
      ;; (displayln "[POOL INIT]: work-distributor is now running in new thread")
      ;; Return the channel for receiving work, so that the outside context can
      ;; make use of it when calling ~publish~ to publish work.
      ;; (displayln "[POOL INIT]: returning channel-receive")
      channel-receive)))


(define publish
  (lambda (work-as-thunk channel-receive)
    ;; The result of the computation can be taken from ~channel-return~.
    (let ([channel-return (make-channel)])
      ;; Put work tagged as work on the receive channel of the work-distributor.
      (let ([work-message (cons 'work (cons work-as-thunk channel-return))])
        (display
         (simple-format
          #f "[PUBLISHER]: will publish the following work: ~a\n"
          work-message))
        (put-message channel-receive work-message))

      (displayln "[PUBLISHER]: work published")
      ;; Return the ~channel-return~, so that the outside context can get
      ;; results from it.
      channel-return)))


(define busy-work
  (lambda ()
    (let loop ([i 0])
      (cond
       [(< i 5e8) (loop (+ i 1))]
       [else i]))))


(define c-rec (pool-initializer #:parallelism 2))
(define c-ret-2 (publish (lambda () (busy-work)) c-rec))
(define c-ret-1 (publish (lambda () (busy-work)) c-rec))
(get-message c-ret-2)
(get-message c-ret-1)
